package org.developer.orderpay_detect

import java.util

import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

//定义输入订单事件的样例类
case class OrderEvent(orderId: Long, eventType: String, txId: String, eventTime: Long)
//定义输出结果样例类
case class OrderResult( orderId: Long, resultMsg: String)

object OrderTimeout {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    //读取订单数据
    val resource = getClass.getResource("/OrderLog.csv")
    val orderEventStream = env.readTextFile(resource.getPath)
      .map( data => {
        val dataArray = data.split(",")
        OrderEvent( dataArray(0).trim.toLong,dataArray(1).trim,dataArray(2).trim,dataArray(3).trim.toLong)
      })
      .assignAscendingTimestamps(_.eventTime * 1000L)
      .keyBy(_.orderId)

    //2. 定义匹配模式
    val orderPayPattern = Pattern.begin[OrderEvent]("begin").where(_.eventType =="create")
      .followedBy("follow").where(_.eventType == "pay")
      .within(Time.minutes(15))


    //3. 把模式应用到 stream，得到一个 pattern stream
    val patternStream = CEP.pattern(orderEventStream,orderPayPattern)

    //4.调用select 方法，提取事件序列,超时的事件要做报警提示
    val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeout")

//    val resultStream = patternStream.select( orderTimeoutOutputTag,new OrderTimeoutSelect(),new OrderPaySelect())

    val resultStream = patternStream.select( orderTimeoutOutputTag,
      new OrderTimeoutSelect(),
      new OrderPaySelect())

    resultStream.print()

    resultStream.getSideOutput(orderTimeoutOutputTag).print("timeout")

    resultStream.print()

    resultStream.getSideOutput(orderTimeoutOutputTag).print("timeout")

    env.execute("order timeout job")
  }
}

//自定义超时事件序列处理函数
class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent,OrderResult]{
  override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = {

    val timeoutOrderId = map.get("begin").iterator().next().orderId

    OrderResult(timeoutOrderId,"timeout")
  }
}

//自定义正常支付事件序列处理函数
class OrderPaySelect() extends PatternSelectFunction[OrderEvent,OrderResult]{
  override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = {

    val payOrderId = map.get("follow").iterator().next().orderId
    OrderResult( payOrderId,"payed successful")
  }
}
